/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef EXTENSIONS_COAP_SERVER_COAPSERVER_H_
#define EXTENSIONS_COAP_SERVER_COAPSERVER_H_

#include "core/Connectable.h"
#include "coap_server.h"
#include "coap_message.h"
#include <coap2/coap.h>
#include <functional>
#include <thread>
#include <future>

namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace coap {

enum class Method {
  Get,
  Post,
  Put,
  Delete
};

/**
 * CoapQuery is the request message sent by the client
 */
class CoapQuery {
 public:
  CoapQuery(const std::string &query, std::unique_ptr<CoapMessage, decltype(&free_coap_message)> message)
      : query_(query),
        message_(std::move(message)) {

  }
  virtual ~CoapQuery() {
  }
  CoapQuery(const CoapQuery &qry) = delete;
  CoapQuery(CoapQuery &&qry) = default;

 private:
  std::string query_;
  std::unique_ptr<CoapMessage, decltype(&free_coap_message)> message_;
};

/**
 * Coap Response that is generated by the CoapServer to the library
 */
class CoapResponse {
  friend class CoapQuery;
 public:
  CoapResponse(int code, std::unique_ptr<uint8_t> data, size_t size)
      : code_(code),
        data_(std::move(data)),
        size_(size) {

  }

  int getCode() const {
    return code_;
  }
  size_t getSize() const {
    return size_;
  }

  uint8_t * const getData() const {
    return data_.get();
  }

  CoapResponse(const CoapResponse &qry) = delete;
  CoapResponse(CoapResponse &&qry) = default;
  CoapResponse &operator=(CoapResponse &&qry) = default;
 private:
  int code_;
  std::unique_ptr<uint8_t> data_;
  size_t size_;
};

/**
 * Wrapper for the coap server functionality using an async callback to perform
 * custom operations. Intended to be used for testing, but may provide capabilities
 * elsewhere.
 */
class CoapServer : public core::Connectable {
 public:
  explicit CoapServer(const std::string &name, const utils::Identifier &uuid)
      : core::Connectable(name, uuid),
        server_(nullptr),
        port_(0) {
    //TODO: this allows this class to be instantiated via the the class loader
    //need to define this capability in the future.
  }
  CoapServer(const std::string &hostname, uint16_t port)
      : core::Connectable(hostname),
        hostname_(hostname),
        server_(nullptr),
        port_(port) {
    coap_startup();
    auto port_str = std::to_string(port_);
    server_ = create_server(hostname_.c_str(), port_str.c_str());
  }

  virtual ~CoapServer();

  void start() {
    running_ = true;

    future = std::async(std::launch::async, [&]() -> uint64_t {
      while (running_) {
        int res = coap_run_once(server_->ctx, 100);
        if (res < 0 ) {
          break;
        }
        coap_check_notify(server_->ctx);
      }
      return 0;

    });
  }

  void add_endpoint(const std::string &path, Method method, std::function<CoapResponse(CoapQuery)> functor) {
    unsigned char mthd = COAP_REQUEST_POST;
    switch (method) {
      case Method::Get:
        mthd = COAP_REQUEST_GET;
        break;
      case Method::Post:
        mthd = COAP_REQUEST_POST;
        break;
      case Method::Put:
        mthd = COAP_REQUEST_PUT;
        break;
      case Method::Delete:
        mthd = COAP_REQUEST_DELETE;
        break;
    }
    auto current_endpoint = endpoints_.find(path);
    if (current_endpoint != endpoints_.end()) {
      ::add_endpoint(current_endpoint->second, mthd, handle_response_with_passthrough);
    } else {
      CoapEndpoint * const endpoint = create_endpoint(server_, path.c_str(), mthd, handle_response_with_passthrough);
      functions_.insert(std::make_pair(endpoint->resource, functor));
      endpoints_.insert(std::make_pair(path, endpoint));
    }
  }

  void add_endpoint(Method method, std::function<CoapResponse(CoapQuery)> functor) {
    unsigned char mthd = COAP_REQUEST_POST;
    switch (method) {
      case Method::Get:
        mthd = COAP_REQUEST_GET;
        break;
      case Method::Post:
        mthd = COAP_REQUEST_POST;
        break;
      case Method::Put:
        mthd = COAP_REQUEST_PUT;
        break;
      case Method::Delete:
        mthd = COAP_REQUEST_DELETE;
        break;
    }
    CoapEndpoint * const endpoint = create_endpoint(server_, NULL, mthd, handle_response_with_passthrough);
    functions_.insert(std::make_pair(endpoint->resource, functor));
    endpoints_.insert(std::make_pair("", endpoint));
  }

  /**
   * Determines if we are connected and operating
   */
  virtual bool isRunning() {
    return running_.load();
  }

  /**
   * Block until work is available on any input connection, or the given duration elapses
   * @param timeoutMs timeout in milliseconds
   */
  void waitForWork(uint64_t timeoutMs);

  virtual void yield() {

  }

  /**
   * Determines if work is available by this connectable
   * @return boolean if work is available.
   */
  virtual bool isWorkAvailable() {
    return true;
  }

 protected:

  static void handle_response_with_passthrough(coap_context_t *ctx, struct coap_resource_t *resource, coap_session_t *session, coap_pdu_t *request, coap_binary_t *token, coap_string_t *query,
                                               coap_pdu_t *response) {

    auto fx = functions_.find(resource);
    if (fx != functions_.end()) {
      auto message = create_coap_message(request);
      CoapQuery qry("", std::unique_ptr<CoapMessage, decltype(&free_coap_message)>(message, free_coap_message));
      // call the UDF
      auto udfResponse = fx->second(std::move(qry));
      response = coap_pdu_init(COAP_MESSAGE_CON, COAP_RESPONSE_CODE(udfResponse.getCode()), coap_new_message_id(session), udfResponse.getSize() + 1);
      coap_add_data(response, udfResponse.getSize(), udfResponse.getData());
      if (coap_send(session, response) == COAP_INVALID_TID) {
        printf("error while returning response");
      }

    }

  }

  std::future<uint64_t> future;
  std::atomic<bool> running_;
  std::string hostname_;
  CoapServerContext *server_;
  static std::map<coap_resource_t*, std::function<CoapResponse(CoapQuery)>> functions_;
  std::map<std::string, CoapEndpoint*> endpoints_;
  uint16_t port_;
};

} /* namespace coap */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */

#endif /* EXTENSIONS_COAP_SERVER_COAPSERVER_H_ */
